Langchain adapter updates and x402 client update#181
Langchain adapter updates and x402 client update#181dixitaniket wants to merge 11 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the LangChain adapter and LLM client behavior to better support async/streaming usage and to automatically recover from a known x402 “Invalid payment required response” failure mode, along with dependency lockfile updates.
Changes:
- Add “retry once after x402 stack reset” behavior for non-streaming and streaming LLM requests when the known invalid-payment error occurs.
- Expand the LangChain adapter to support client injection, async methods, streaming chunk conversion, and improved tool-call parsing/serialization.
- Update pinned dependency versions (including
og-test-v2-x402==0.0.12.dev3) and refreshuv.lock.
Reviewed changes
Copilot reviewed 6 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/opengradient/client/llm.py |
Adds x402 stack reset + retry-once logic for completion/chat, including streaming retry. |
src/opengradient/agents/og_langchain.py |
Refactors LangChain adapter for async paths, streaming support, tool handling, and client lifecycle. |
src/opengradient/agents/__init__.py |
Extends langchain_adapter factory to accept client + connection overrides and new params. |
tests/llm_test.py |
Adds tests asserting a single retry + reset on the invalid-payment error (streaming and non-streaming). |
tests/langchain_adapter_test.py |
Adds coverage for injected client init, missing-key validation, identifying params, and async/stream paths. |
pyproject.toml |
Bumps pinned og-test-v2-x402 dependency. |
requirements.txt |
Bumps pinned og-test-v2-x402 dependency. |
uv.lock |
Updates locked versions (including opengradient metadata and og-test-v2-x402). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| def _stream( | ||
| self, | ||
| messages: List[BaseMessage], | ||
| stop: Optional[List[str]] = None, | ||
| run_manager: Optional[CallbackManagerForLLMRun] = None, | ||
| **kwargs: Any, | ||
| ) -> Iterator[ChatGenerationChunk]: | ||
| sdk_messages = self._convert_messages_to_sdk(messages) | ||
| chat_kwargs = self._build_chat_kwargs(sdk_messages, stop, stream=True, **kwargs) | ||
| queue: Queue[Any] = Queue() | ||
|
|
||
| return ChatResult(generations=[ChatGeneration(message=ai_message, generation_info={"finish_reason": finish_reason})]) | ||
| def _runner() -> None: | ||
| async def _run() -> None: | ||
| stream = await self._llm.chat(**chat_kwargs) | ||
| async for chunk in cast(AsyncIterator[StreamChunk], stream): | ||
| queue.put(self._stream_chunk_to_generation(chunk)) | ||
|
|
||
| try: | ||
| asyncio.run(_run()) | ||
| except BaseException as exc: # noqa: BLE001 | ||
| queue.put(exc) | ||
| finally: | ||
| queue.put(_STREAM_END) | ||
|
|
||
| thread = Thread(target=_runner, daemon=True) | ||
| thread.start() |
There was a problem hiding this comment.
_stream always spawns a new thread and runs self._llm.chat() inside asyncio.run(). Since the SDK client’s underlying async HTTP client is created on the main thread, using it from another thread/event loop is not safe. Consider implementing sync streaming by driving an event loop in the current thread (e.g., create a dedicated loop and iterate the async generator via run_until_complete on __anext__()), or otherwise ensure the HTTP client is created/used within the same thread+loop.
| ): | ||
| resolved_model_cid = model_cid or model | ||
| if resolved_model_cid is None: | ||
| raise ValueError("model_cid (or model) is required.") |
There was a problem hiding this comment.
OpenGradientChatModel now accepts model_cid / model as str, but the underlying SDK client expects models in the provider/model form (LLM.chat does model.split("/")[1]). Passing a plain model name like "gpt-5" would raise IndexError at runtime. Consider validating the string format here (or normalizing to the required format) and raising a clear ValueError when it’s not supported.
| raise ValueError("model_cid (or model) is required.") | |
| raise ValueError("model_cid (or model) is required.") | |
| # When a plain string is provided, ensure it matches the expected "provider/model" format | |
| if isinstance(resolved_model_cid, str) and "/" not in resolved_model_cid: | |
| raise ValueError( | |
| f"Invalid model identifier '{resolved_model_cid}'. " | |
| "Expected format 'provider/model', e.g. 'openai/gpt-4o'." | |
| ) |
| try: | ||
| asyncio.get_running_loop() | ||
| except RuntimeError: | ||
| return asyncio.run(coro) | ||
|
|
||
| queue: Queue[Any] = Queue(maxsize=1) | ||
|
|
||
| def _runner() -> None: | ||
| try: | ||
| queue.put(asyncio.run(coro)) | ||
| except BaseException as exc: # noqa: BLE001 | ||
| queue.put(exc) | ||
|
|
||
| thread = Thread(target=_runner, daemon=True) | ||
| thread.start() | ||
| outcome = queue.get() | ||
| thread.join() | ||
|
|
||
| if isinstance(outcome, BaseException): | ||
| raise outcome | ||
| return outcome | ||
|
|
||
|
|
There was a problem hiding this comment.
_run_coro_sync runs the coroutine in a new thread when a loop is already running. Because LLM holds an httpx.AsyncClient (via x402HttpxClient), running its async methods in a different thread/event loop is not thread-safe and can fail with “attached to a different loop” style errors. A safer approach is to either (a) raise a clear error instructing callers to use the async methods when a loop is running, or (b) schedule work onto the existing loop (without creating a second event loop) rather than using asyncio.run in another thread.
| try: | |
| asyncio.get_running_loop() | |
| except RuntimeError: | |
| return asyncio.run(coro) | |
| queue: Queue[Any] = Queue(maxsize=1) | |
| def _runner() -> None: | |
| try: | |
| queue.put(asyncio.run(coro)) | |
| except BaseException as exc: # noqa: BLE001 | |
| queue.put(exc) | |
| thread = Thread(target=_runner, daemon=True) | |
| thread.start() | |
| outcome = queue.get() | |
| thread.join() | |
| if isinstance(outcome, BaseException): | |
| raise outcome | |
| return outcome | |
| """Run a coroutine synchronously when no event loop is running. | |
| If an event loop is already running in this thread, synchronous execution | |
| is not supported because it may conflict with resources (such as HTTP | |
| clients) that are bound to the existing loop. In that case, callers must | |
| use the async APIs directly. | |
| """ | |
| try: | |
| # Raises RuntimeError if no loop is running in this thread. | |
| asyncio.get_running_loop() | |
| except RuntimeError: | |
| # Safe to create and run a new event loop. | |
| return asyncio.run(coro) | |
| # An event loop is already running; do not create a second loop in | |
| # another thread, as this is not safe for loop-bound resources. | |
| raise RuntimeError( | |
| "Cannot run coroutine synchronously while an event loop is running. " | |
| "Use the async methods of OpenGradientChatModel instead." | |
| ) |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 9 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| "openai>=1.58.1", | ||
| "pydantic>=2.9.2", | ||
| "og-test-v2-x402==0.0.11" | ||
| "og-x402==0.0.1.dev2" |
| def _build_chat_kwargs(self, sdk_messages: List[Dict[str, Any]], stop: Optional[List[str]], stream: bool, **kwargs: Any) -> Dict[str, Any]: | ||
| x402_settlement_mode = kwargs.get("x402_settlement_mode", self.x402_settlement_mode) | ||
| if isinstance(x402_settlement_mode, str): | ||
| x402_settlement_mode = x402SettlementMode(x402_settlement_mode) | ||
| model = kwargs.get("model", self.model_cid) | ||
| model = _validate_model_string(model) | ||
|
|
||
| return { | ||
| "model": model, | ||
| "messages": sdk_messages, | ||
| "stop_sequence": stop, | ||
| "max_tokens": kwargs.get("max_tokens", self.max_tokens), | ||
| "temperature": kwargs.get("temperature", self.temperature), | ||
| "tools": kwargs.get("tools", self._tools), | ||
| "tool_choice": kwargs.get("tool_choice", self._tool_choice), | ||
| "x402_settlement_mode": x402_settlement_mode, | ||
| "stream": stream, | ||
| } |
| # self-hosted TEE servers commonly use self-signed certificates. | ||
| verify_ssl = llm_server_url is None | ||
| self._tls_verify: Union[ssl.SSLContext, bool] = ssl_ctx if ssl_ctx else verify_ssl | ||
| self._reset_lock = threading.Lock() |
There was a problem hiding this comment.
this is incompatible with async coroutines, it only works with threads
| return False | ||
|
|
||
| @staticmethod | ||
| def _is_ssl_error(exc: Exception) -> bool: |
There was a problem hiding this comment.
is this really necessary? why do we need this complex error handling now?
| operation_name, | ||
| first_error, | ||
| ) | ||
| await self._refresh_tee_and_reset() |
There was a problem hiding this comment.
why do we have to rebuild x402 on an exception?
Uh oh!
There was an error while loading. Please reload this page.